Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove wakers for cancelled tasks #42

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

progval
Copy link
Contributor

@progval progval commented Nov 11, 2023

When an async function does:

async f(&mut self) {
    tokio::select! {
        v = self.subscriber().next => { /* do something with v */ }
        _ = std::future::ready() => {},
    }
}

then the future returned by self.subscriber().next is cancelled, but the observed object stilled referenced the waker, preventing the future (consequently, the function's closure) from being dropped even though it won't be scheduled again.

This change is twofold:

  1. ObservableState is now handed Weak references, so it does not keep futures alive, and a strong reference is kept by whichever object is held by the future awaiting it (Subscriber or Next)
  2. ObservableState garbage-collects weak references from time to time, so its own vector of wakers does not grow unbounded

Many thanks for having this log line:

tracing::debug!("Waking up {num_wakers} waiting subscribers");

I spent my day trying to figure why my app was leaking 1MB/s and Waking up XXXX waiting subscribers in my logs made me figure it out :)

@progval progval force-pushed the leak-cancelled-future branch 2 times, most recently from 439ddf1 to 1fd6d5f Compare November 11, 2023 17:58
When an async function does:

```
async f(&mut self) {
    tokio::select! {
        v = self.subscriber().next => { /* do something with v */ }
        _ = std::future::ready() => {},
    }
}
```

then the future returned by `self.subscriber().next` is cancelled, but
the observed object stilled referenced the waker, preventing the future
(consequently, the function's closure) from being dropped even though
it won't be scheduled again.

This change is twofold:

1. `ObservableState` is now handed `Weak` references, so it does not
   keep futures alive, and a strong reference is kept by whichever
   object is held by the future awaiting it (`Subscriber` or `Next`)
2. `ObservableState` garbage-collects weak references from time to time,
   so its own vector of wakers does not grow unbounded
@jplatte
Copy link
Owner

jplatte commented Nov 11, 2023

Hey, thanks for this PR!

Is this pattern used in other channel implementations? I'm surprised something like this is necessary.

@progval
Copy link
Contributor Author

progval commented Nov 11, 2023

The only other channel I'm familiar with is tokio::mpsc, and its equivalent is to return Err when .send()ing to a channel with no subscriber

@jplatte
Copy link
Owner

jplatte commented Nov 11, 2023

Hm, that does not make sense to me, how is that equivalent. All wakers in eyeball are already consumed when "sending" (setting a new value on the observable), even without this change.

@progval
Copy link
Contributor Author

progval commented Nov 11, 2023

Without this change, they are consumed only when sending. So if we never send (or take a long time between sends) then wakers accumulate.

@jplatte
Copy link
Owner

jplatte commented Nov 11, 2023

Yes, but it sounds like tokio's mpsc suffers from the same problem then?

@progval
Copy link
Contributor Author

progval commented Nov 11, 2023

hmm okay actually, tokio::sync::broadcast. It's built on top of a linked list of messages shared by sender and receiver:

https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L306-L307

and a (sorted) linked list recording the position and waker of each receiver:

https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L312-L313

https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L330-L331

https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L354-L360

and when a receiver is dropped, it removes itself from the list, including the waker:

https://github.com/tokio-rs/tokio/blob/49eb26f159c839fb34f4ce964759692443ca1302/tokio/src/sync/broadcast.rs#L1332-L1334

(it's not clear to me why the receiver being dropped is always at the tail of the list; i guess what it calls "tail" is actually a pointer to the link that was the tail at the time the receiver was created and the list is actually doubly-linked)

@progval
Copy link
Contributor Author

progval commented Nov 11, 2023

Alternatively, we could probably remove all the manual waker management from eyeball and use tokio::sync::watch instead. Unfortunately there is no equivalent in the stdlib, so it would mean making dep:tokio/sync non-optional

@jplatte
Copy link
Owner

jplatte commented Nov 11, 2023

eyeball was originally implemented on top of tokio, but that was limiting in a number of ways. I guess we should probably do something like tokio's waker list, but I'm not sure if there are any crates out that that implement this, and it feels a bit wrong to put it right in eyeball.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants